/*
 * Copyright 2011 PA Consulting Ltd
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.prodeagle.java.counters;


import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;

import com.google.appengine.api.NamespaceManager;
import com.google.appengine.api.datastore.AsyncDatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.PreparedQuery;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.api.datastore.Query.FilterOperator;
import com.prodeagle.java.DatastoreManager;
import com.prodeagle.java.ProdEagleConstants;
import com.prodeagle.java.DatastoreManager.ResultType;

/**
 * Helper functions and utilities that help with the management 
 * of counters by storing and receiving values into MemCache
 * @author Edward Hartwell Goose
 *
 */
public class CounterNamesManager implements ProdEagleConstants {
	
	private static final Logger _logger = Logger.getLogger(CounterNamesManager.class.getName());
	
	/**
	 * The maximum difference between two slots
	 */
	private static final int MAX_CLOCK_SKEW = 1000 * 60; //60 seconds
	
	private Long lastUpdate;
	/**
	 * The set of counter names known to the instance
	 */
	private Set<String> knownCounterNames;
	/**
	 * The id (key) of the last shard of counter names
	 */
	private Long lastShard;
	
	/**
	 * The time the CounterNamesManager was created - so we can reinitalise it every 1 days
	 */
	public Long creationTime;
	
	public CounterNamesManager() {
		creationTime = new Date().getTime();
		lastUpdate = null;
		knownCounterNames = new HashSet<String>();
		lastShard = null;
	}

	/**
	 * Get all the counter names from the datastore
	 * @return - a set of all the counter names
	 */
	public Iterator<Entity> allCounterNameShards() {
		String originalNamespace = NamespaceManager.get();
		
		try {
			NamespaceManager.set(NAMESPACE);
			
			AsyncDatastoreService asyncDatastoreService = DatastoreServiceFactory.getAsyncDatastoreService();
			
			Query query = new Query("CounterNamesShard");
			
			if (null != this.lastUpdate) {
				Long now = new Date().getTime();
				query.addFilter("timestamp", FilterOperator.GREATER_THAN_OR_EQUAL, (now - (MAX_CLOCK_SKEW)));
			}
			
			PreparedQuery pq = asyncDatastoreService.prepare(query);
			Iterator<Entity> it = pq.asIterator();
			
			return it;
		} finally {
			NamespaceManager.set(originalNamespace);
		}
	}
	
	public Set<String> allCounterNames() {
		Iterator<Entity> it = allCounterNameShards();
		
		while (it.hasNext()) {
			Entity entity = it.next();
			
			@SuppressWarnings("unchecked")
			List<String> names = (List<String>) entity.getProperty("names");
			this.knownCounterNames.addAll(names);
			
			if (null == this.lastShard) {
				this.lastShard = entity.getKey().getId();
			} else {
				this.lastShard = Math.max(this.lastShard, entity.getKey().getId());
			}
			
			this.lastUpdate = new Date().getTime();
		}
			
		return this.knownCounterNames;
	}
	
	/**
	 * Adds counters to the list of known counters (if they don't already exist)
	 * @param names
	 * @return - an object[] { Boolean, Long }, where the Boolean indicates if a read of the datastore was done
	 * and the Long indicates how many counters were written to the database
	 */
	public Object[] addIfNew(Collection<String> names) {
		String originalNamespace = NamespaceManager.get();

		try {
			NamespaceManager.set(NAMESPACE);
			Set<String> newNames = new HashSet<String>();
			Set<String> fresh = null;
			
			for (String name : names) {
				if (!this.knownCounterNames.contains(name)) {
					if (null == fresh) {
						fresh = this.allCounterNames(); //retrieve all the counter names from the datastore
						
						if (!fresh.contains(name)) {
							newNames.add(name);
						}
					} else {
						newNames.add(name);
					}
				}
			}
			
			if (!newNames.isEmpty()) {
				_logger.info("There are " + newNames.size() + " new counters to add");
				if (null == this.lastShard) {
					Entity entity = DatastoreManager.getOrInsertCounterNamesShard(1);
					this.lastShard = entity.getKey().getId();
				}
				
				DatastoreManager.ResultType result = DatastoreManager.addNames(this.lastShard, newNames);
				
				_logger.info("Adding names returned: " + result.toString());
				
				if (result.equals(ResultType.ADD_FULL)) {
					DatastoreManager.getOrInsertCounterNamesShard(this.lastShard + 1);
					addIfNew(names);
				}
				
				if (result.equals(ResultType.ADD_SUCCESS)) {
					_logger.info(newNames.size() + " counters have been added");
				} else {
					_logger.warning("Couldn't register counter names. Will retry next time. Numbers of counters: " + newNames.size());
				}
			}
			
			Boolean b = (null != fresh);
			
			return new Object[] { b, Long.valueOf(newNames.size()) };
		} finally {
			NamespaceManager.set(originalNamespace);
		}
	}

	/**
	 * Delete a set of counters from the counter name shards
	 * 
	 * Note from Andrin:
	 * When a counter is deleted and used again afterwards
	 * it will not be harvested in the following case:
	 * Instance A deletes counter C.
	 * Instance B has C still in knownCounterNames
	 * and increments C without updating the CounterNamesShards.
	 * Instance A generates the harvest report and wont find C.
	 *
	 * To fix this issue, we reload the DefaultCounterNamesManager once a day.
	 * To fix this issue manually, you can just deploy a new version of your app.
	 * 
	 * @param deleteCounters - the set of counters to delete
	 */
	public void delete(Set<String> deleteCounters) {
		String originalNamespace = NamespaceManager.get();
		
		try {
			NamespaceManager.set(NAMESPACE);
			
			for (String deleteCounter : deleteCounters) {
				if (this.allCounterNames().contains(deleteCounter)) {
					this.allCounterNames().remove(deleteCounter);
				}
			}
			
			Set<Entity> dirtyShards = new HashSet<Entity>();
			Iterator<Entity> it = allCounterNameShards();
			while (it.hasNext()) {
				Boolean dirty = false;
				Entity counterNameShard = it.next();
				
				@SuppressWarnings("unchecked")
				List<String> counterNames = (List<String>) counterNameShard.getProperty("names");
				for (String deleteCounter : deleteCounters) {
					if (counterNames.contains(deleteCounter)) {
						counterNames.remove(deleteCounter);
						dirty = true;
					}
				}

				if (dirty) {
					dirtyShards.add(counterNameShard);
				}
			}
			
			if (!dirtyShards.isEmpty()) {
				AsyncDatastoreService ds = DatastoreServiceFactory.getAsyncDatastoreService();
				ds.put(dirtyShards);
			}
		} catch (Exception e) {
			_logger.severe("Unexpected exception while deleting counter: " + e);
		} finally {
			NamespaceManager.set(originalNamespace);
		}
	}
}
